{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "8a9bcefe",
   "metadata": {},
   "source": [
    "## Python Version\n",
    "\n",
    "* Checking Python Version"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "37025861",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "3.8.16\n"
     ]
    }
   ],
   "source": [
    "from platform import python_version\n",
    "print(python_version())"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "15522e0b",
   "metadata": {},
   "source": [
    "## Working with Sessions - Multiple Ways\n",
    "\n",
    "*  Section delineates connecting to a snowflake session\n",
    "* Explains session connection through default and private key methdology"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "e5f3a766",
   "metadata": {},
   "outputs": [],
   "source": [
    "from snowflake.snowpark import Session\n",
    "from snowflake.snowpark.functions import col\n",
    "import configparser\n",
    "import os\n",
    "\n",
    "# Default Method\n",
    "connection_parameters = {\n",
    "    \"account\": \"<SNOWFLAKE_ACCOUNT>\",\n",
    "    \"user\": \"<SNOWFLAKE_USER>\",\n",
    "    \"password\": \"<SNOWFLAKE_PASSWORD>\",\n",
    "    \"role\": \"<SNOWFLAKE_ROLE>\",\n",
    "    \"warehouse\": \"<SNOWFLAKE_WAREHOUSE>\",  # optional\n",
    "    \"database\": \"<SNOWFLAKE_DATABASE>\",  # optional\n",
    "    \"schema\": \"<SNOWFLAKE_SCHEMA>\" # optional\n",
    "  }\n",
    "\n",
    "\n",
    "\n",
    "# Loading Credentials From Environmental Variables\n",
    "connection_parameters = {\n",
    "    \"account\": os.getenv('SNOWFLAKE_ACCOUNT'),\n",
    "    \"user\": os.getenv('SNOWFLAKE_USER'),\n",
    "    \"password\": os.getenv('SNOWFLAKE_PASSWORD'),\n",
    "    \"role\": os.getenv('SNOWFLAKE_ROLE'),\n",
    "    \"warehouse\": os.getenv(\"SNOWFLAKE_WAREHOUSE\"),  # optional\n",
    "    \"database\": os.getenv(\"SNOWFLAKE_DATABASE\"),  # optional\n",
    "    \"schema\": os.getenv(\"SNOWFLAKE_SCHEMA\") # optional\n",
    "  }\n",
    "\n",
    "\n",
    "\n",
    "# Loading Credential From Config File\n",
    "snowflake_credentials_file = '../snowflake_creds.config'\n",
    "config = configparser.ConfigParser()\n",
    "config.read(snowflake_credentials_file)\n",
    "connection_parameters = dict(config['default'])\n",
    "\n",
    "\n",
    "session = Session.builder.configs(connection_parameters).create()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "ab45e220",
   "metadata": {},
   "source": [
    "### Key Pair Authentication (Optional)\n",
    "\n",
    "* Explains session connection through Private key setup"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "818d0ac7",
   "metadata": {},
   "outputs": [],
   "source": [
    "## Import packages with which to parse the private key\n",
    "from cryptography.hazmat.backends import default_backend\n",
    "from cryptography.hazmat.primitives import serialization\n",
    "\n",
    "## Define our plain-text private key and passphrase\n",
    "## which in reality would have been ingested from somewhere\n",
    "private_key_plain_text = '''-----BEGIN PRIVATE KEY-----\n",
    "< your private key >\n",
    "-----END PRIVATE KEY-----'''\n",
    "\n",
    "private_key_passphrase = '<your private key passphase>'\n",
    "\n",
    "def load_private_key(private_key_plain_text, private_key_passphrase):\n",
    "    ## Encode private key\n",
    "    private_key_encoded = private_key_plain_text.encode()\n",
    "\n",
    "    ## Encode private key passphrase\n",
    "    private_key_passphrase_encoded = private_key_passphrase.encode()\n",
    "\n",
    "    ## Load the private key, leveraging passphrase if needed\n",
    "    private_key_loaded = serialization.load_pem_private_key(\n",
    "        private_key_encoded,\n",
    "        password = private_key_passphrase_encoded,\n",
    "        backend = default_backend()\n",
    "    )\n",
    "\n",
    "    ## Serialize loaded private key\n",
    "    private_key_serialized = private_key_loaded.private_bytes(\n",
    "        encoding = serialization.Encoding.DER,\n",
    "        format = serialization.PrivateFormat.PKCS8,\n",
    "        encryption_algorithm = serialization.NoEncryption()\n",
    "    )\n",
    "\n",
    "    return private_key_serialized\n",
    "\n",
    "## Define connection parameters\n",
    "connection_parameters = {\n",
    "    \"account\": \"<your snowflake account identifier>\",\n",
    "    \"user\": \"<your snowflake username>\",\n",
    "    \"private_key\": \"<load_private_key(private_key_plain_text, private_key_passphrase)>\",\n",
    "    \"warehouse\": \"<your snowflake warehouse>\",  # optional\n",
    "    \"database\": \"<your snowflake database>\",  # optional\n",
    "    \"schema\": \"<your snowflake schema>\"  # optional\n",
    "}"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "060040e0",
   "metadata": {},
   "source": [
    "### Session Parameters\n",
    "\n",
    "* Printsout Necessary Session Parameters"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f867fca5",
   "metadata": {},
   "outputs": [],
   "source": [
    "session.sql(\"CREATE WAREHOUSE IF NOT EXISTS COMPUTE_WH WITH WAREHOUSE_SIZE='X-SMALL'\").collect()\n",
    "session.sql(\"CREATE DATABASE IF NOT EXISTS SNOWPARK_DEFINITIVE_GUIDE\").collect()\n",
    "session.sql(\"CREATE SCHEMA IF NOT EXISTS SNOWPARK_DEFINITIVE_GUIDE.MY_SCHEMA\").collect()\n",
    "session.sql(\"CREATE STAGE IF NOT EXISTS SNOWPARK_DEFINITIVE_GUIDE.MY_SCHEMA.MY_STAGE\").collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "009c573b",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Session Current Account: \"eh24472\"\n",
      "Session Current Database: \"SNOWPARK_DEFINITIVE_GUIDE\"\n",
      "Session Current Role: \"ACCOUNTADMIN\"\n",
      "Session Current Schema: \"MY_SCHEMA\"\n",
      "Session Current Warehouse: \"COMPUTE_WH\"\n",
      "Session Current Fully Qualified Schema: \"SNOWPARK_DEFINITIVE_GUIDE\".\"MY_SCHEMA\"\n",
      "Session query history: <snowflake.snowpark.query_history.QueryHistory object at 0x000001B202209310>\n",
      "Session imports: []\n",
      "Session packages: {}\n",
      "Session stage details: @\"SNOWPARK_DEFINITIVE_GUIDE\".\"MY_SCHEMA\".SNOWPARK_TEMP_STAGE_NWJX5KQG0Q\n"
     ]
    }
   ],
   "source": [
    "print(\"Session Current Account:\",session.get_current_account())\n",
    "print(\"Session Current Database:\",session.get_current_database())\n",
    "print(\"Session Current Role:\",session.get_current_role())\n",
    "print(\"Session Current Schema:\",session.get_current_schema())\n",
    "print(\"Session Current Warehouse:\",session.get_current_warehouse())\n",
    "print(\"Session Current Fully Qualified Schema:\",session.get_fully_qualified_current_schema())\n",
    "print(\"Session query history:\",session.query_history())\n",
    "print(\"Session imports:\",session.get_imports())\n",
    "print(\"Session packages:\",session.get_packages())\n",
    "print(\"Session stage details:\",session.get_session_stage())"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "fdfd7bce",
   "metadata": {},
   "source": [
    "### Importing Custom Library using External Stage\n",
    "\n",
    "* Delineates How Files Can Be Uploaded To Snowflake Stage"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "id": "a491c8b3",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(status='MY_STAGE already exists, statement succeeded.')]"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "stage_info = session.sql(\"create or replace stage MY_STAGE\")\n",
    "stage_info.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "id": "3169b725",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'UPLOADED'"
      ]
     },
     "execution_count": 20,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "put_result = session.file.put(\"./worksheets/worksheet_codes/last_name_finder_stage.py\", \"@MY_STAGE/\")\n",
    "put_result[0].status"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "02fed8bd",
   "metadata": {},
   "source": [
    "## Working with Dataframes\n",
    "\n",
    "* This section explains  basic Dataframe I/O operations\n",
    "* Covers Loading data into Table, read and write operations"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "57f780ae",
   "metadata": {},
   "source": [
    "### Creating Table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "66cd02cf",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(status='Table SAMPLE_EMPLOYEE_DATA successfully created.')]"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "session.sql('CREATE OR REPLACE TABLE SAMPLE_EMPLOYEE_DATA (id INT,name VARCHAR, age INT, email VARCHAR, city VARCHAR,country VARCHAR)').collect()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "dd233a74",
   "metadata": {},
   "source": [
    "### Loading Data To Table "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "79ff93b4",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(number of rows inserted=10)]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "session.sql(\"\"\"\n",
    "    INSERT INTO SAMPLE_EMPLOYEE_DATA VALUES\n",
    "    (1,'John Doe',25,'johndoe@example.com','New York','USA'),\n",
    "    (2, 'Jane Smith',30,'janesmith@example.com','Los Angeles','USA'),\n",
    "    (3, 'Michael Johnson',35,'michaeljohnson@example.com','London','UK'),\n",
    "    (4, 'Sarah Williams',28,'sarahwilliams@example.com','Leeds','UK'),\n",
    "    (5,'David Brown',32,'davidbrown@example.com','Tokyo','Japan'),\n",
    "    (6,'Emily Davis',29,'emilydavis@example.com','Sydney','Australia'),\n",
    "    (7,'James Miller',27,'jamesmiller@example.com','Dallas','USA'),\n",
    "    (8,'Emma Wilson',33,'emmawilson@example.com','Berlin','Germany'),\n",
    "    (9,'Alexander Taylor',31,'alexandertaylor@example.com','Rome','Italy'),\n",
    "    (10,'Olivia Anderson',26,'oliviaanderson@example.com','Melbourne','Australia')\n",
    "    \"\"\").collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "8a89cc15",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(COUNT(*)=10)]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "session.sql(\"SELECT count(*) FROM SAMPLE_EMPLOYEE_DATA\").collect()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "f8283660",
   "metadata": {},
   "source": [
    "### Reading Data from Table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "2649016f",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_subset_row = session.table(\"SAMPLE_EMPLOYEE_DATA\").filter(col(\"id\") == 1) ##  Filters col ID = 1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "6715b5a0",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "------------------------------------------------------------------------\n",
      "|\"ID\"  |\"NAME\"    |\"AGE\"  |\"EMAIL\"              |\"CITY\"    |\"COUNTRY\"  |\n",
      "------------------------------------------------------------------------\n",
      "|1     |John Doe  |25     |johndoe@example.com  |New York  |USA        |\n",
      "------------------------------------------------------------------------\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_subset_row.show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "24756cbf",
   "metadata": {},
   "source": [
    "### Writing Data to Table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "72eee89f",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>ID</th>\n",
       "      <th>NAME</th>\n",
       "      <th>AGE</th>\n",
       "      <th>EMAIL</th>\n",
       "      <th>CITY</th>\n",
       "      <th>COUNTRY</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>1</td>\n",
       "      <td>John Doe</td>\n",
       "      <td>25</td>\n",
       "      <td>johndoe@example.com</td>\n",
       "      <td>New York</td>\n",
       "      <td>USA</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "   ID      NAME  AGE                EMAIL      CITY COUNTRY\n",
       "0   1  John Doe   25  johndoe@example.com  New York     USA"
      ]
     },
     "execution_count": 14,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "\n",
    "snowpark_df = session.write_pandas(df_subset_row.to_pandas(), \"SAMPLE_EMPLOYEE_DATA_SUBSET\", auto_create_table=True)\n",
    "snowpark_df.to_pandas()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "8bb502b6",
   "metadata": {},
   "source": [
    "## Working with UDF\n",
    "\n",
    "* Section provides code templates for Snowpark UDF\n",
    "* Includes UDF template, example and execution of UDF"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "589c5a01",
   "metadata": {},
   "source": [
    "### UDF Template"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d430b846",
   "metadata": {},
   "outputs": [],
   "source": [
    "\"\"\"\n",
    "##################################################################\n",
    "## Define the function for the UDF\n",
    "def <main Python function name>(<arguments>):\n",
    "  return <function output>\n",
    "\n",
    "# Imports required packages\n",
    "from snowflake.snowpark.types import <specific Snowpark DataType object>\n",
    "\n",
    "# Optional: Import additional packages or files\n",
    "snowpark_session.add_packages('List of native packages in Anaconda Channel')\n",
    "snowpark_session.add_import('Path to Local File')\n",
    "\n",
    "##################################################################\n",
    "## Register UDF in Snowflake\n",
    "snowpark_session.udf.register(\n",
    "    func = <Main Function Name>\n",
    "  , return_type = <Return Type of Snowpark DataType object >\n",
    "  , input_types = <[Input Types of Snowflake DataType object]>\n",
    "  , is_permanent = True\n",
    "  , name = '<UDF name>'\n",
    "  , replace = True\n",
    "  , stage_location = '@<UDF stage name>'\n",
    ")\n",
    "\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "02b9f08b",
   "metadata": {},
   "outputs": [],
   "source": [
    "##################################################################\n",
    "## Define the function for the UDF\n",
    "\n",
    "def last_name_finder(input_name:str):\n",
    "  last_name = input_name.split()[1]\n",
    "  return last_name\n",
    "\n",
    "### Add packages and data types\n",
    "from snowflake.snowpark.types import StringType,IntegerType,ArrayType\n",
    "\n",
    "\n",
    "##################################################################\n",
    "## Register UDF in Snowflake\n",
    "test = session.udf.register(\n",
    "    func = last_name_finder\n",
    "  , return_type = StringType()\n",
    "  , input_types = [StringType()]\n",
    "  , is_permanent = True\n",
    "  , name = 'LAST_NAME_FINDER'\n",
    "  , replace = True\n",
    "  , stage_location = '@MY_STAGE'\n",
    ")\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "8d534634",
   "metadata": {},
   "source": [
    "### Executing UDF"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "33230d29",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "----------------------------------\n",
      "|\"NAME\"            |\"LAST_NAME\"  |\n",
      "----------------------------------\n",
      "|John Doe          |Doe          |\n",
      "|Jane Smith        |Smith        |\n",
      "|Michael Johnson   |Johnson      |\n",
      "|Sarah Williams    |Williams     |\n",
      "|David Brown       |Brown        |\n",
      "|Emily Davis       |Davis        |\n",
      "|James Miller      |Miller       |\n",
      "|Emma Wilson       |Wilson       |\n",
      "|Alexander Taylor  |Taylor       |\n",
      "|Olivia Anderson   |Anderson     |\n",
      "----------------------------------\n",
      "\n"
     ]
    }
   ],
   "source": [
    "\n",
    "session.sql('''SELECT\n",
    "    NAME\n",
    "  , LAST_NAME_FINDER(NAME) AS LAST_NAME\n",
    "FROM SAMPLE_EMPLOYEE_DATA\n",
    "''').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "454349f9",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "---------------------------------------------------------------------------------------------------------\n",
      "|\"ID\"  |\"NAME\"            |\"AGE\"  |\"EMAIL\"                      |\"CITY\"       |\"COUNTRY\"  |\"LAST_NAME\"  |\n",
      "---------------------------------------------------------------------------------------------------------\n",
      "|1     |John Doe          |25     |johndoe@example.com          |New York     |USA        |Doe          |\n",
      "|2     |Jane Smith        |30     |janesmith@example.com        |Los Angeles  |USA        |Smith        |\n",
      "|3     |Michael Johnson   |35     |michaeljohnson@example.com   |London       |UK         |Johnson      |\n",
      "|4     |Sarah Williams    |28     |sarahwilliams@example.com    |Leeds        |UK         |Williams     |\n",
      "|5     |David Brown       |32     |davidbrown@example.com       |Tokyo        |Japan      |Brown        |\n",
      "|6     |Emily Davis       |29     |emilydavis@example.com       |Sydney       |Australia  |Davis        |\n",
      "|7     |James Miller      |27     |jamesmiller@example.com      |Dallas       |USA        |Miller       |\n",
      "|8     |Emma Wilson       |33     |emmawilson@example.com       |Berlin       |Germany    |Wilson       |\n",
      "|9     |Alexander Taylor  |31     |alexandertaylor@example.com  |Rome         |Italy      |Taylor       |\n",
      "|10    |Olivia Anderson   |26     |oliviaanderson@example.com   |Melbourne    |Australia  |Anderson     |\n",
      "---------------------------------------------------------------------------------------------------------\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from snowflake.snowpark.functions import col,call_udf\n",
    "df = session.table(\"SAMPLE_EMPLOYEE_DATA\")\n",
    "df.with_column(\"last_name\",call_udf(\"LAST_NAME_FINDER\", col(\"name\"))).show() "
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "fdd15431",
   "metadata": {},
   "source": [
    "## Working with UDTF\n",
    "\n",
    "* Section provides code templates for Snowpark UDTF\n",
    "* Includes UDTF template, example and execution of UDTF"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "87985299",
   "metadata": {},
   "source": [
    "### UDTF Template"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "93fa0632",
   "metadata": {},
   "outputs": [],
   "source": [
    "\"\"\"\n",
    "\n",
    "##################################################################\n",
    "## Define the class for the UDTF\n",
    "class <name of main Python class> :\n",
    "  \n",
    "  def __init__(self) :\n",
    "    # Python code at the partition level\n",
    "  \n",
    "  def process(self, <arguments>) :\n",
    "    # Python code at the row level\n",
    "    yield (<col_1_val_1>, <col_2_val_1>, ...)\n",
    "    yield (<col_1_val_2>, <col_2_val_2>, ...)\n",
    "\n",
    "    # or\n",
    "\n",
    "    return [\n",
    "        (<col_1_val_1>, <col_2_val_1>, ...)\n",
    "      , (<col_1_val_2>, <col_2_val_2>, ...)\n",
    "    ]\n",
    "  \n",
    "  \n",
    "  def end_partition(self) :\n",
    "    # Python code at the partition level\n",
    " \n",
    "    yield (<col_1_val_1>, <col_2_val_1>, ...)\n",
    "    yield (<col_1_val_2>, <col_2_val_2>, ...)\n",
    "    # or \n",
    "    return [\n",
    "        (<col_1_val_1>, <col_2_val_1>, ...)\n",
    "      , (<col_1_val_2>, <col_2_val_2>, ...)\n",
    "    ]\n",
    "\n",
    "# Import data types for defining the tabular output structure\n",
    "from snowflake.snowpark.types import StructType, StructField\n",
    "\n",
    "snowpark_session.add_packages('List of native packages in Anaconda Channel')\n",
    "snowpark_session.add_import('Path to Local File')\n",
    "\n",
    "##################################################################\n",
    "## Register UDTF in Snowflake\n",
    "snowpark_session.udtf.register(\n",
    "    handler = <Python Class Name>\n",
    "  , output_schema = StructType(<list of field name and Snowpark DataType objects - StructField objects>)\n",
    "  , input_types = <[Input Types of Snowflake DataType object]>\n",
    "  , is_permanent = True\n",
    "  , name = '<UDTF name>'\n",
    "  , replace = True\n",
    "  , stage_location = '@<UDTF stage name>'\n",
    ")\n",
    "\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 158,
   "id": "714ec2d9",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<snowflake.snowpark.udtf.UserDefinedTableFunction at 0x2bb83956a90>"
      ]
     },
     "execution_count": 158,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "##################################################################\n",
    "## Define the class for the UDTF\n",
    "\n",
    "# Define handler class\n",
    "class CalculateAverage:\n",
    "  def __init__(self) :\n",
    "    self._values = []\n",
    "\n",
    "  def process(self, input_measure: int) :\n",
    "    self._values.append(input_measure)\n",
    "\n",
    "  def end_partition(self) :\n",
    "    values_list = self._values\n",
    "    average = sum(values_list) / len(values_list)\n",
    "    yield(average ,)\n",
    "\n",
    "### Add packages and data types\n",
    "from snowflake.snowpark.types import StructType, StructField\n",
    "from snowflake.snowpark.types import FloatType,IntegerType,StringType\n",
    "\n",
    "### Define output schema\n",
    "output_schema = StructType([\n",
    "      StructField(\"Avg_Age\", FloatType())\n",
    "  ])\n",
    "\n",
    "##################################################################\n",
    "## Register UDTF in Snowflake\n",
    "session.udtf.register(\n",
    "    handler = CalculateAverage\n",
    "  , output_schema = output_schema\n",
    "  , input_types = [IntegerType()]\n",
    "  , is_permanent = True\n",
    "  , name = 'AVERAGE_AGE'\n",
    "  , replace = True\n",
    "  , stage_location = '@MY_STAGE'\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "9c013b37",
   "metadata": {},
   "source": [
    "### Executing UDTF"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 159,
   "id": "c67caa68",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "----------------------------------\n",
      "|\"COUNTRY\"  |\"AVG_AGE\"           |\n",
      "----------------------------------\n",
      "|Germany    |33.0                |\n",
      "|Italy      |31.0                |\n",
      "|UK         |31.5                |\n",
      "|Australia  |27.5                |\n",
      "|Japan      |32.0                |\n",
      "|USA        |27.333333333333332  |\n",
      "----------------------------------\n",
      "\n"
     ]
    }
   ],
   "source": [
    "\n",
    "session.sql('''SELECT\n",
    "COUNTRY,Avg_Age\n",
    "FROM SAMPLE_EMPLOYEE_DATA\n",
    ",table(AVERAGE_AGE(AGE) over (partition by COUNTRY))\n",
    "''').show()\n",
    "\n",
    "# select SAMPLE_EMPLOYEE_DATA.AGE\n",
    "#   from SAMPLE_EMPLOYEE_DATA, table(SNOWPARK_UDF_TOTAL_SPEND(AGE) over (partition by COUNTRY));"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "078a631f",
   "metadata": {},
   "source": [
    "## Working with Vectorized UDF\n",
    "\n",
    "* Section provides code templates for Snowpark Vectorized UDF\n",
    "* Includes vectorized UDF template, example and execution of vectorized UDF"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ff56cbad",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "from snowflake.snowpark.functions import pandas_udf\n",
    "from snowflake.snowpark.types import IntegerType, PandasSeriesType,StringType\n",
    "\n",
    "@pandas_udf(\n",
    "name='column_adder'\n",
    ",stage_location = '@MY_STAGE'\n",
    ",input_types=[PandasSeriesType(StringType()), PandasSeriesType(StringType())]\n",
    ",return_type=PandasSeriesType(StringType())\n",
    ",is_permanent=True\n",
    ",replace=True)\n",
    "def column_adder(column1: pd.Series, column2: pd.Series) -> pd.Series:\n",
    "  return column1 + \",\" + column2"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "1b5c0598",
   "metadata": {},
   "source": [
    "### Executing Vectorized UDF"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "9c8ca4c3",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "-----------------------------------------------------------------------------------------------------------------\n",
      "|\"ID\"  |\"NAME\"            |\"AGE\"  |\"EMAIL\"                      |\"CITY\"       |\"COUNTRY\"  |\"CITY_COUNTRY\"       |\n",
      "-----------------------------------------------------------------------------------------------------------------\n",
      "|1     |John Doe          |25     |johndoe@example.com          |New York     |USA        |New York,USA         |\n",
      "|2     |Jane Smith        |30     |janesmith@example.com        |Los Angeles  |USA        |Los Angeles,USA      |\n",
      "|3     |Michael Johnson   |35     |michaeljohnson@example.com   |London       |UK         |London,UK            |\n",
      "|4     |Sarah Williams    |28     |sarahwilliams@example.com    |Leeds        |UK         |Leeds,UK             |\n",
      "|5     |David Brown       |32     |davidbrown@example.com       |Tokyo        |Japan      |Tokyo,Japan          |\n",
      "|6     |Emily Davis       |29     |emilydavis@example.com       |Sydney       |Australia  |Sydney,Australia     |\n",
      "|7     |James Miller      |27     |jamesmiller@example.com      |Dallas       |USA        |Dallas,USA           |\n",
      "|8     |Emma Wilson       |33     |emmawilson@example.com       |Berlin       |Germany    |Berlin,Germany       |\n",
      "|9     |Alexander Taylor  |31     |alexandertaylor@example.com  |Rome         |Italy      |Rome,Italy           |\n",
      "|10    |Olivia Anderson   |26     |oliviaanderson@example.com   |Melbourne    |Australia  |Melbourne,Australia  |\n",
      "-----------------------------------------------------------------------------------------------------------------\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df = session.table(\"SAMPLE_EMPLOYEE_DATA\")\n",
    "df.withColumn('City_Country', column_adder(col('CITY'), col('COUNTRY'))).show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "7f8a9193",
   "metadata": {},
   "source": [
    "## Working with Stored Procedure\n",
    "\n",
    "* Section provides code templates for Snowpark Stored Procedure\n",
    "* Includes Stored Procedure template, example and executio"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "c79e71b8",
   "metadata": {},
   "source": [
    "### Stored Procedure Template"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "844cfdad",
   "metadata": {},
   "outputs": [],
   "source": [
    "\"\"\"\n",
    "\n",
    "##################################################################\n",
    "## Define the function for the Stored Procedure\n",
    "def <Python Function Name>(snowpark_session: snowflake.snowpark.Session, <arguments>):\n",
    "  return <Output>\n",
    "\n",
    "# Imports Required For Stored Procedure\n",
    "from snowflake.snowpark.types import <specific Snowpark DataType object>\n",
    "\n",
    "# Optional: Import additional packages or files\n",
    "snowpark_session.add_packages('List of native packages in Anaconda Channel')\n",
    "snowpark_session.add_import('Path to Local File')\n",
    "\n",
    "##################################################################\n",
    "## Register Stored Procedure in Snowflake\n",
    "snowpark_session.sproc.register(\n",
    "    func = <Function name to register>\n",
    "  , return_type = <Return Type of Snowpark DataType object >\n",
    "  , input_types = <[Input Types of Snowflake DataType object]>\n",
    "  , is_permanent = True\n",
    "  , name = '<Stored Procedure name>'\n",
    "  , replace = True\n",
    "  , stage_location = '@<Stored Procedure stage name>'\n",
    "  <optional: , execute_as = 'CALLER'>\n",
    ")\n",
    "\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "889e68df",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<snowflake.snowpark.stored_procedure.StoredProcedure at 0x1b207c5aac0>"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "##################################################################\n",
    "## Define the function for the Stored Procedure\n",
    "\n",
    "def subset_table(snowpark_session:Session):\n",
    "  df =  snowpark_session.table('SAMPLE_EMPLOYEE_DATA').select(\"NAME\",\"AGE\")\n",
    "  return df.collect()\n",
    "\n",
    "##################################################################\n",
    "## Register Stored Procedure in Snowflake\n",
    "\n",
    "### Add packages and data types\n",
    "from snowflake.snowpark.types import StringType\n",
    "session.add_packages('snowflake-snowpark-python')\n",
    "\n",
    "### Upload Stored Produre to Snowflake\n",
    "session.sproc.register(\n",
    "    func = subset_table\n",
    "  , return_type = StringType()\n",
    "  , input_types = []\n",
    "  , is_permanent = True\n",
    "  , name = 'SPROC_SUBSET_TABLE'\n",
    "  , replace = True\n",
    "  , stage_location = '@MY_STAGE'\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "a66b72b2",
   "metadata": {},
   "source": [
    "### Executing Stored Procedure"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "f91d1500",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "------------------------------------------------------\n",
      "|\"SPROC_SUBSET_TABLE\"                                |\n",
      "------------------------------------------------------\n",
      "|[Row(NAME='John Doe', AGE=25), Row(NAME='Jane S...  |\n",
      "------------------------------------------------------\n",
      "\n"
     ]
    }
   ],
   "source": [
    "session.sql(''' CALL SPROC_SUBSET_TABLE()''').show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "6502d748",
   "metadata": {},
   "source": [
    "### Anonymous Stored Procedure\n",
    "\n",
    "* Section provides code templates for Snowpark Anonymous Stored Procedure \n",
    "* Includes template, example and execution of Anonymous Strored Procedure"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "256a5b6a",
   "metadata": {},
   "outputs": [],
   "source": [
    "\"\"\"\n",
    "WITH <name> AS PROCEDURE ( [ <arg_name> <arg_data_type> ] [ , ... ] )\n",
    "  RETURNS { <result_data_type> [ [ NOT ] NULL ] | TABLE ( [ <col_name> <col_data_type> [ , ... ] ] ) }\n",
    "  LANGUAGE PYTHON\n",
    "  RUNTIME_VERSION = '<python_version>'\n",
    "  PACKAGES = ( 'snowflake-snowpark-python[==<version>]'[, '<package_name>[==<version>]' ... ])\n",
    "  [ IMPORTS = ( '<stage_path_and_file_name_to_read>' [, '<stage_path_and_file_name_to_read>' ...] ) ]\n",
    "  HANDLER = '<function_name>'\n",
    "  [ { CALLED ON NULL INPUT | { RETURNS NULL ON NULL INPUT | STRICT } } ]\n",
    "  [ , <cte_nameN> [ ( <cte_column_list> ) ] AS ( SELECT ...  ) ]\n",
    "  AS '<procedure_definition>'\n",
    "CALL <name> ( [ [ <arg_name> => ] <arg> , ... ] )\n",
    "  [ INTO :<snowflake_scripting_variable> ]\n",
    "\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "00c4e5e9",
   "metadata": {},
   "outputs": [],
   "source": [
    "\"\"\"\n",
    "\n",
    "WITH proc AS PROCEDURE(table_name TEXT,country TEXT)\n",
    "RETURNS TEXT\n",
    "LANGUAGE PYTHON\n",
    "RUNTIME_VERSION = '3.8'\n",
    "PACKAGES = ('snowflake-snowpark-python')\n",
    "HANDLER = 'main'\n",
    "AS\n",
    "$$\n",
    "\n",
    "from snowflake.snowpark.functions import col\n",
    "def main(session, table_name,country):\n",
    "  return session.table(table_name).filter(col(\"country\") == country).count()\n",
    "$$\n",
    "\n",
    "CALL proc('SAMPLE_EMPLOYEE_DATA','USA');\n",
    "\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "markdown",
   "id": "450369d1",
   "metadata": {},
   "source": [
    "# Cleanup Snowflake Objects"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "61a1d514",
   "metadata": {},
   "outputs": [],
   "source": [
    "session.sql(\"\"\" CREATE OR REPLACE DATABASE SNOWPARK_DEFINITIVE_GUIDE\"\"\").collect()\n",
    "session.sql(\"\"\" CREATE OR REPLACE SCHEMA SNOWPARK_DEFINITIVE_GUIDE.MY_SCHEMA\"\"\").collect()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.16"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
